Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose arguments on consume API #46

Merged
merged 1 commit into from
Jan 29, 2024

Conversation

beano
Copy link
Contributor

@beano beano commented Jan 20, 2024

I was trying to use amqp-client to work with rabbitmq streams (https://www.rabbitmq.com/streams.html) but was unable to set arguments on the consume function. The rabbitmq docs shows that this is supported by passing arguments to consume but amqp-client doesn't currently expose this capability.

channel.basicQos(100); // QoS must be specified
channel.basicConsume(
  "my-stream",
  false,
  Collections.singletonMap("x-stream-offset", "first"), // "first" offset specification
  (consumerTag, message) -> {
    // message processing
    // ...
   channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // ack is required
  },
  consumerTag -> { });

@andersfugmann
Copy link
Owner

Thanks for adding this.

The semantics around stream are somewhat different to queues - both when declaring and consuming.
Do you think it would make sense to create an new Stream library to handle decleration and consumption of queues to enforce stream semantics (e.g.. exclusive consumption is not allowed, need to set per consumer QoS et. al.)

I think it could be implemented as a thin wrapper around the Queue module to enforce Stream semantics

Copy link
Owner

@andersfugmann andersfugmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@andersfugmann andersfugmann merged commit 6813dda into andersfugmann:master Jan 29, 2024
4 checks passed
@michaelklishin
Copy link

Team RabbitMQ develops separate clients for streams. The stream protocol is completely different, and while consuming from streams using an AMQP 0-9-1 is possible, it was never meant to be more than a last resort compatibility measure.

@andersfugmann
Copy link
Owner

I did see that there is a different binary protocol for this. Do you know if there is a machine readable spec for the protocol to ease auto-generation of the protocol and primitives as there is for the amqp protocol? I could only find a human readable definition.

@michaelklishin
Copy link

michaelklishin commented Jan 31, 2024

Team RabbitMQ no longer considers the "one true code generator" a sensible approach to client library development, so there is only the protocol spec and several clients to use as examples.

@beano
Copy link
Contributor Author

beano commented Feb 11, 2024

I did look at the native streaming protocol but it was going to be a fairly significant piece of work to implement and test. I wanted to experiment with RabbitMQ streaming via OCaml and this looked like a quick , supported way to get started and it only required a small, backwards-compatible diff. I also felt that it might be reasonable to expose the consume options because they seem to be part of the public API in the AMQP spec.

Thanks very much for accepting the MR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants